package com.innovation.ic.cc.base.thread.listener.rabbitmq;

import com.innovation.ic.cc.base.pojo.constant.handler.RabbitMqConstants;
import com.innovation.ic.cc.base.service.cc.InquiryPriceService;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * @author zengqinglong
 * @desc 询价管理 消息延迟队列
 * @Date 2023/4/28 15:40
 **/
public class ListenInquiryManagementDelayQueueThread extends AbstractRabbitmqThread implements Runnable {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    private Channel channel;

    private InquiryPriceService inquiryPriceService;


    public ListenInquiryManagementDelayQueueThread(Channel channel,InquiryPriceService inquiryPriceService) {
        this.channel = channel;
        this.inquiryPriceService = inquiryPriceService;
    }

    @Override
    @SneakyThrows
    public void run() {
        String exchange = RabbitMqConstants.INQUIRY_DElAY_EXCHANGE;
        String routingKey = RabbitMqConstants.INQUIRY_DElAY_ROUTING_KEY;
        String queue = RabbitMqConstants.INQUIRY_DElAY_QUEUE;
        // 延迟交换机参数
        Map<String, Object> delayParams = getNormalAndDeadParams();
        // 声明一个队列与交换机及绑定关系
        handleQueueAndBinding(channel, queue, delayParams, exchange, routingKey);
        channel.basicQos(1);
        //开启监听Queue
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                //处理具体逻辑
                inquiryPriceService.handleMsg(msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(RabbitMqConstants.INQUIRY_DElAY_QUEUE, false, consumer);
    }

    private static Map<String, Object> getNormalAndDeadParams() {
        Map<String, Object> argMap = new HashMap<String, Object>();
        argMap.put("x-delayed-type", "direct");
        return argMap;
    }

    /**
     * 处理队列与绑定关系
     *
     * @param channel
     * @param deadQueueName
     * @param o
     * @param exchangeName
     * @param routingKey
     * @throws IOException
     */
    private static void handleQueueAndBinding(Channel channel, String deadQueueName, Map<String, Object> o, String exchangeName, String routingKey) throws IOException {
        channel.exchangeDeclare(exchangeName, "x-delayed-message", true, false, o);
        channel.queueDeclare(deadQueueName, true, false, false, new HashMap<String, Object>());
        channel.queueBind(deadQueueName, exchangeName, routingKey);
    }
}
